package com.tpvlog.dfs.client;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

import java.io.File;

/**
 * 文件系统客户端的实现类
 *
 * @author Ressmix
 */
public class FileSystemImpl implements FileSystem {

    private NameNodeRpcClient rpcClient = new NameNodeRpcClient();

    public void mkdir(String path) throws Exception {
        rpcClient.mkdir(path);
    }

public Boolean upload(byte[] file, String filename, long fileSize) throws Exception {
    // 1.RPC接口发送文件元数据
    if (!filename.startsWith(File.separator)) {
        filename = File.separator + filename;
    }
    if (!rpcClient.createFile(filename)) {
        return false;
    }

    // 2.RPC接口获取DataNode
    String datanodesJson = rpcClient.allocateDataNodes(filename, fileSize, "");
    System.out.println(datanodesJson);
    if (datanodesJson == null) {
        return false;
    }

    // 3.遍历DataNode，依次上传文件
    JSONArray datanodes = JSONArray.parseArray(datanodesJson);
    for (int i = 0; i < datanodes.size(); i++) {
        JSONObject datanode = datanodes.getJSONObject(i);
        String hostname = datanode.getString("hostname");
        String ip = datanode.getString("ip");
        int nioPort = datanode.getIntValue("nioPort");
        try {
            DFSNIOClient.sendFile(hostname, nioPort, file, filename, fileSize);
        } catch (Exception ex) {
            ex.printStackTrace();
            // 出现异常时，上送异常DataNode信息，并重新获取一个正常的DataNode
            String  reallocateDataNode= rpcClient.allocateDataNodes(filename, fileSize, ip + "-" + hostname);
            datanode = JSONObject.parseObject(reallocateDataNode);
            hostname = datanode.getString("hostname");
            nioPort = datanode.getIntValue("nioPort");
            DFSNIOClient.sendFile(hostname, nioPort, file, filename, fileSize);
        }

    }
    return true;
}

    public byte[] download(String filename) throws Exception {
        // 1.获取待下载文件对应的可用DataNode节点
        String datanode = rpcClient.getDataNodeForFile(filename, "");
        System.out.println("NameNode分配用来下载文件的数据节点：" + datanode);

        // 2.解析DataNode信息
        JSONObject jsonObject = JSONObject.parseObject(datanode);
        String hostname = jsonObject.getString("hostname");
        String ip = jsonObject.getString("ip");
        Integer nioPort = jsonObject.getInteger("nioPort");

        // 3.基于Java NIO下载文件
        byte[] file = null;
        try {
            file = DFSNIOClient.readFile(hostname, nioPort, filename);
        } catch (Exception ex) {
            // 出现异常，重新获取一个可用DataNode，上送异常的DataNode信息
            datanode = rpcClient.getDataNodeForFile(filename, ip + "-" + hostname);
            jsonObject = JSONObject.parseObject(datanode);
            hostname = jsonObject.getString("hostname");
            nioPort = jsonObject.getInteger("nioPort");

            try {
                file = DFSNIOClient.readFile(hostname, nioPort, filename);
            } catch (Exception e2) {
                throw e2;
            }
        }
        return file;
    }

    public void shutdown() throws Exception {
        rpcClient.shutdown();
    }
}
